package cn.pink.core;

import cn.pink.core.gen.proxy.RPCImplBase;
import cn.pink.core.scheduler.ScheduleJob;
import cn.pink.core.scheduler.ScheduleMethod;
import cn.pink.core.scheduler.ScheduleTask;
import cn.pink.core.support.Log;
import cn.pink.core.support.SysException;
import org.quartz.*;
import org.quartz.impl.matchers.GroupMatcher;

import java.lang.reflect.Constructor;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.concurrent.ConcurrentLinkedQueue;

import static org.quartz.JobKey.jobKey;

/**
 * 服务接口，实现调度器和rpc
 * 继承本接口的对象可以通过Proxy被远程调用
 * 尽量减少使用调度器任务, 会引起堆内存增大, 详情见压测记录.
 *
 * @Author: pink
 * @Date: 2022/6/16 18:01
 */
public abstract class Service {
    /** 所属Port */
    protected Port port;

    /** 缓存本类的调度队列函数 */
    private List<Method> scheMethods = new ArrayList<>();
    /** 待处理的时间调度队列 */
    private ConcurrentLinkedQueue<ScheduleTask> schedulerList = new ConcurrentLinkedQueue<>();

    /** 缓存服务对应的代理类 */
    private RPCImplBase methodFunctionProxy;

    public Service(Port port) {
        this.port = port;
    }

    /**
     * 获取服务ID
     */
    public abstract Object getId() ;

    /**
     * 启动
     * port不开定时器不用调用
     */
    public void startup() {
        scheduleInit();
    }

    /**
     * 心跳
     */
    public final void pulse() {
        pulseSchedulers();
        pulseOverride();
    }

    /**
     * 子类可覆盖心跳
     */
    public void pulseOverride() {}

    //----------------------------------------------------------------------------
    // 调度相关

    /**
     * 执行时间调度队列
     */
    private void pulseSchedulers() {
        ScheduleTask task = null;
        while ((task = schedulerList.poll()) != null) {
            try {
                task.execute();
            } catch (Exception e) {
                // 不做任何处理 仅仅抛出异常
                // 避免因为一个任务的出错 造成后续的任务无法继续执行 需要等到下一个心跳
                Log.system.error("", e);
            }
        }
    }

    /**
     * 初始化该port上的schedule
     */
    private void scheduleInit() {
        Method[] methods = this.getClass().getMethods();
        for (Method method : methods) {
            if (!method.isAnnotationPresent(ScheduleMethod.class)) {
                continue;
            }

            scheMethods.add(method);
        }

        for (final Method method : scheMethods) {
            ScheduleMethod anno = method.getAnnotation(ScheduleMethod.class);
            String[] cronStrs = anno.value();

            for (String cronStr : cronStrs) {
                scheduleCron(new ScheduleTask("servId", getId()) {
                    @Override
                    public String getJobGroup() {
                        return getId().toString();
                    }

                    @Override
                    public void execute() {
                        try {
                            Port port = Port.getCurrent();
                            Service serv = port.getService(getParam().get("servId"));
                            method.invoke(serv);
                        } catch (Exception e) {
                            throw new SysException(e, "service初始化schedule错误，serviceId:{}", getParam().<Object>get("servId"));
                        }
                    }
                }, cronStr);
            }
        }
    }

    /**
     * 延迟delay毫秒之后执行任务
     */
    public void scheduleOnce(ScheduleTask task, long delay) {
        task.triggerAt = port.getTimeCurrent() + delay;

        // 定义trigger
        SimpleScheduleBuilder sche = SimpleScheduleBuilder.repeatSecondlyForever();
        // 循环次数 设置为0 代表不多余循环只执行一次
        sche.withRepeatCount(0);

        // 添加任务
        scheduleUtils(task, sche, delay);
    }

    /**
     * 从指定的delay毫秒延迟之后，开始以重复的速率每period毫秒执行
     */
    public void schedulePeriod(ScheduleTask task, long delay, long period) {
        task.triggerAt = System.currentTimeMillis() + delay;
        task.triggerPeriod = period;

        // 定义schedule
        SimpleScheduleBuilder sche = SimpleScheduleBuilder.repeatSecondlyForever();
        // 执行间隔
        sche.withIntervalInMilliseconds(period);

        // 添加任务
        scheduleUtils(task, sche, delay);
    }

    /**
     * 从指定的delay毫秒延迟之后，开始以重复的速率每period毫秒执行，执行count次
     */
    public void scheduleRepeatForTotalCount(ScheduleTask task, long delay, long period, int count) {
        task.triggerAt = System.currentTimeMillis() + delay;
        task.triggerPeriod = period;

        // 定义schedule
        SimpleScheduleBuilder sche = SimpleScheduleBuilder.repeatSecondlyForTotalCount(count);
        // 执行间隔
        sche.withIntervalInMilliseconds(period);

        // 添加任务
        scheduleUtils(task, sche, delay);
    }

    /**
     * 添加时间任务队列 支持cron格式
     */
    public void scheduleCron(ScheduleTask task, String cronStr) {
        task.triggerCronStr = cronStr;

        // 定义schedule
        CronScheduleBuilder scheduleBuilder = CronScheduleBuilder.cronSchedule(cronStr);

        // 添加任务
        scheduleUtils(task, scheduleBuilder, 0);
    }

    /**
     * 时间调度任务
     */
    private void scheduleUtils(ScheduleTask task,
                               ScheduleBuilder<?> scheduleBuilder, long delay) {
        try {
            // 开始执行时间
            Date startAt = new Date();
            if (delay > 0) {
                startAt = new Date(System.currentTimeMillis() + delay);
            }

            // 定义时间调度的job内容
            JobDetail jobDetail = JobBuilder.newJob(ScheduleJob.class).withIdentity(task.jobKey).build();
            jobDetail.getJobDataMap().put("task", task);
            jobDetail.getJobDataMap().put("scheduler", schedulerList);

            // 创建最终trigger
            Trigger trigger = TriggerBuilder.newTrigger().startAt(startAt).withSchedule(scheduleBuilder).build();

            // 绑定job和trigger
            port.scheduler.scheduleJob(jobDetail, trigger);

            // 设置任务信息
            task.state = ScheduleTask.STATE_WAITING;
            task.sched = port.scheduler;
            task.jobKey = jobDetail.getKey();
            task.trigger = trigger;
        } catch (Exception e) {
            throw new SysException(e);
        }
    }

    /**
     * 删除所在port上默认jobGroup中指定jobName的job
     */
    protected void deleteSchedulerJob(String jobName) throws SchedulerException {
        port.scheduler.deleteJob(jobKey(jobName, null));
    }

    /**
     * 删除所在port上的指定jobName和jobGroupName的job
     */
    protected void deleteSchedulerJob(String jobName, String jobGroupName) throws SchedulerException {
        port.scheduler.deleteJob(jobKey(jobName, jobGroupName));
    }

    /**
     * 通过group名删除所在port上的所有group内的scheduler
     */
    protected void deleteSchedulerJobsByGroup(String jobGroupName) throws SchedulerException {
        //除了中心服其他服不需要quartz定时任务, 影响性能
        if(port.scheduler == null) {
            return;
        }

        for (String group : port.scheduler.getJobGroupNames()) {
            if (!group.equals(jobGroupName)) {
                continue;
            }

            for (JobKey jobKey : port.scheduler.getJobKeys(GroupMatcher.jobGroupEndsWith(group))) {
                port.scheduler.deleteJob(jobKey);
            }
        }
    }

    //----------------------------------------------------------------------------
    // call相关

    /**
     * 获取RPC函数调用
     */
    public <T> T getMethodFunction(int funcKey) {
        try {
            // 获取对应的代理类
            if (methodFunctionProxy == null) {
                Class<?> cls = Class.forName(getClass().getName() + "Impl");
                Constructor<?> c = cls.getDeclaredConstructor();
                c.setAccessible(true);
                methodFunctionProxy = (RPCImplBase)c.newInstance();
            }

            // 通过代理类 获取函数引用
            return methodFunctionProxy.getMethodFunction(this, funcKey);
        } catch (Exception e) {
            throw new SysException(e);
        }
    }

    /**
     * 获取函数是否带锁
     * @return timeout 负数无锁
     */
    public int getMethodLock(int funcKey) {
        try {
            // 获取对应的代理类
            if (methodFunctionProxy == null) {
                Class<?> cls = Class.forName(getClass().getName() + "Impl");
                Constructor<?> c = cls.getDeclaredConstructor();
                c.setAccessible(true);
                methodFunctionProxy = (RPCImplBase)c.newInstance();
            }

            // 通过代理类 获取函数引用
            return methodFunctionProxy.getMethodLock(this, funcKey);
        } catch (Exception e) {
            throw new SysException(e);
        }
    }

    public CallPoint getCallPoint() {
        return new CallPoint(port.getNodeId(), port.getId(), getId());
    }

    public Port getPort() {
        return port;
    }
}
